package day05;

import beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Flink Table API 与 SQL —— Over聚合
 * <p>
 * Over聚合窗口函数阿里云文档：https://help.aliyun.com/document_detail/62514.html
 *
 * @author lvbingbing
 * @date 2022-01-20 21:24
 */
public class FlinkTableApi08 {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 2、设置事件时间属性
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 3、开窗窗口
        studyOverWindow(env);
        // 4、触发程序执行
        env.execute();
    }

    /**
     * Over 聚合
     *
     * @param env 可执行环境
     */
    private static void studyOverWindow(StreamExecutionEnvironment env) {
        // 1、从文件中读取数据，并指定事件时间时间戳 和 watermark
        DataStream<SensorReading> dataStreamSource = env.readTextFile("input/sensor.txt")
                .map(e -> {
                    String[] split = e.split(",");
                    return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {

                    private static final long serialVersionUID = -4107255905349616357L;

                    @Override
                    public long extractTimestamp(SensorReading sensorReading) {
                        return sensorReading.getTimestamp() * 1000L;
                    }
                });
        // 2、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 3、从数据流中获取Table，并创建临时视图
        Table dataStreamTable = tableEnv.fromDataStream(dataStreamSource, "id, timestamp as ts, temperature as temp, rt.rowtime");
        tableEnv.createTemporaryView("sensor", dataStreamTable);
        // 4、按照实际元素的行确定窗口
        rowsOverWindow(dataStreamTable, tableEnv);
        // 5、按照实际的元素值（时间戳值）确定窗口。
         rangeOverWindow(dataStreamTable, tableEnv);
    }


    /**
     * 按照实际元素的行确定窗口
     *
     * @param dataStreamTable 数据流中获取到的Table
     * @param tableEnv        表环境
     */
    private static void rowsOverWindow(Table dataStreamTable, StreamTableEnvironment tableEnv) {
        // 1、tableApi
        Table resultTable = dataStreamTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
                .select("id, rt, id.count over ow, temp.avg over ow");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(resultTable, Row.class);
        rowDataStream.print("overDs");
        // 2、sql
        String sql = "select id, rt, count(id) over ow, avg(temp) over ow " +
                "from sensor " +
                "window ow as (partition by id order by rt rows between 2 preceding and current row)";
        Table sqlQueryTable = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream1 = tableEnv.toAppendStream(sqlQueryTable, Row.class);
        rowDataStream1.print("overSqlQueryDs");
    }

    /**
     * 按照实际的元素值（时间戳值）确定窗口。
     *
     * @param dataStreamTable 数据流中获取到的Table
     * @param tableEnv        表环境
     */
    private static void rangeOverWindow(Table dataStreamTable, StreamTableEnvironment tableEnv) {
        // 1、tableApi
        Table resultTable = dataStreamTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.minutes").as("ow"))
                .select("id, rt, id.count over ow, temp.avg over ow");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(resultTable, Row.class);
         rowDataStream.print("rangeOverDs");
        // 2、sql
        String sql = "select id, rt, count(id) over ow, avg(temp) over ow " +
                "from sensor " +
                "window ow as (partition by id order by rt range between interval '2' minute preceding and current row)";
        Table sqlQueryTable = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream1 = tableEnv.toAppendStream(sqlQueryTable, Row.class);
        rowDataStream1.print("rangeOverSqlQueryDs");
    }
}
